package com.example.networkflow;

import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import java.sql.Timestamp;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.regex.Pattern;

public class HotPages {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);

        //URL resourcePath = HotPages.class.getResource("/apache.log");
        //DataStream<String> dataStream = env.readTextFile(resourcePath.getPath());
        DataStream<String> dataStream = env.socketTextStream("localhost", 7777);

        DataStream<ApacheLogEvent> logEventDataStream = dataStream.map(line -> {
            //93.114.45.13 - - 17/05/2015:10:05:04 +0000 GET /reset.css
            String[] fields = line.split(" ");
            String ip = fields[0];
            String userId = fields[1];
            SimpleDateFormat sdf = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss");
            Long tiemstamp = sdf.parse(fields[3]).getTime();
            String method = fields[5];
            String url = fields[6];
            ApacheLogEvent logEvent = new ApacheLogEvent(ip, userId, tiemstamp, method, url);
            return logEvent;
        }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<ApacheLogEvent>(Time.minutes(1)) {
            @Override
            public long extractTimestamp(ApacheLogEvent apacheLogEvent) {
                return apacheLogEvent.getTiemstamp();
            }
        });
        //定义一个侧输出流标签
        OutputTag lateTag = new OutputTag<ApacheLogEvent>("late") {

        };

        //分组开窗
        SingleOutputStreamOperator<PageViewCount> windowAggStream = logEventDataStream
                .filter(logEvent -> "GET".equalsIgnoreCase(logEvent.getMethod()))
                .filter(logEvent -> {
                    String regex = "^((?!\\.(css|js|png|ico)$).)*$";
                    return Pattern.matches(regex, logEvent.getUrl());
                })
                .keyBy(ApacheLogEvent::getUrl)
                .timeWindow(Time.minutes(10), Time.seconds(5))
                .allowedLateness(Time.minutes(1))
                .sideOutputLateData(lateTag)
                .aggregate(new PageCountAgg(), new PageCountResult());


        windowAggStream.print("agg");
        windowAggStream.getSideOutput(lateTag).print("late");

        //收集同一窗口count数据，排序输出
        DataStream<String> resultStream = windowAggStream.keyBy(PageViewCount::getWindowEnd)
                .process(new TOpNHotPages(5));

        resultStream.print();
        env.execute("job-hot pages");
    }

    //自定义预聚合函数，增量聚合
    public static class PageCountAgg implements AggregateFunction<ApacheLogEvent, Long, Long> {
        @Override
        public Long createAccumulator() {
            return 0L;
        }

        @Override
        public Long add(ApacheLogEvent value, Long accumulator) {
            return accumulator + 1;
        }

        @Override
        public Long getResult(Long acc) {
            return acc;
        }

        @Override
        public Long merge(Long a, Long b) {
            return a + b;
        }
    }

    //实现自定义的窗口函数， IN, OUT, KEY (跟前面keyBy的输出有关), W
    public static class PageCountResult implements WindowFunction<Long, PageViewCount, String, TimeWindow> {
        @Override
        public void apply(String url, TimeWindow timeWindow, Iterable<Long> iterable, Collector<PageViewCount> collector) throws Exception {
            PageViewCount pageViewCount = new PageViewCount();
            pageViewCount.setUrl(url);
            pageViewCount.setWindowEnd(timeWindow.getEnd());
            pageViewCount.setCount(iterable.iterator().next());
            collector.collect(pageViewCount);
        }
    }

    //自定义所有数据到达之后的排序输出。
    public static class TOpNHotPages extends KeyedProcessFunction<Long, PageViewCount, String> {
        private Integer topSize;

        public TOpNHotPages(Integer topSize) {
            this.topSize = topSize;
        }

        //定义状态，保存当前所有的OA个ViewCount到list
        ListState<PageViewCount> pageViewCountListState;

        @Override
        public void open(Configuration parameters) throws Exception {
            pageViewCountListState = getRuntimeContext().getListState(new ListStateDescriptor<PageViewCount>("page-count-list", PageViewCount.class));
        }

        @Override
        public void processElement(PageViewCount pageViewCount, Context context, Collector<String> collector) throws Exception {
            pageViewCountListState.add(pageViewCount);

            context.timerService().registerEventTimeTimer(pageViewCount.getWindowEnd() + 1);
        }

        @Override
        public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
            ArrayList<PageViewCount> list = Lists.newArrayList(pageViewCountListState.get());
            list.sort((o1, o2) -> {
                if (o1.getCount() > o2.getCount()) {
                    return -1;
                } else if (o1.getCount() < o2.getCount()) {
                    return 1;
                } else {
                    return 0;
                }
            });


            //格式化输出
            //排名信息打印输出
            StringBuilder resultBuilder = new StringBuilder();
            resultBuilder.append("==================").append("\n");
            resultBuilder.append("窗口结束时间:").append(new Timestamp(timestamp - 1))
                    .append("\n");
            //遍历列表，取TopN
            for (int i = 0; i < Math.min(topSize, list.size()); i++) {
                PageViewCount currentItemViewCount = list.get(i);
                resultBuilder
                        .append("No ").append(i + 1).append(": ")
                        .append(" 页面URL = ")
                        .append(currentItemViewCount.getUrl())
                        .append(" 浏览量 = ").append(currentItemViewCount.getCount())
                        .append("\n");
            }

            resultBuilder.append("===============\n\n");

            //控制输出评率
            Thread.sleep(1000L);

            out.collect(resultBuilder.toString());
        }

    }
}
